activiti和设计模式(7)

一个具体的流程启动以后,具体流程示意图:
简单会签流程示意图

追踪流程的运转的情况,找到好玩的代码。
根据状态机的跳转的动作:

PROCESS_START = new AtomicOperationProcessStart
||
PROCESS_START_INITIAL = new AtomicOperationProcessStartInitial();
||
ACTIVITY_EXECUTE = new AtomicOperationActivityExecute();

我们看到第一个的开始节点的执行:
开始节点的执行

public class NoneStartEventActivityBehavior extends FlowNodeActivityBehavior {

  // Nothing to see here.
  // The default behaviour of the BpmnActivity is exactly what
  // a none start event should be doing.
}

实现的在FlowNodeActivityBehavior中

protected BpmnActivityBehavior bpmnActivityBehavior = new BpmnActivityBehavior();

 /**
  * Default behaviour: just leave the activity with no extra functionality.
  */
 public void execute(ActivityExecution execution) throws Exception {
   leave(execution);
 }

 /**
  * Default way of leaving a BPMN 2.0 activity: evaluate the conditions on the
  * outgoing sequence flow and take those that evaluate to true.
  */
 protected void leave(ActivityExecution execution) {
   bpmnActivityBehavior.performDefaultOutgoingBehavior(execution);
 }

 protected void leaveIgnoreConditions(ActivityExecution activityContext) {
   bpmnActivityBehavior.performIgnoreConditionsOutgoingBehavior(activityContext);
 }

默认的bpmnActivityBehavior沿着默认的线达到下一个节点,是吗?

protected void performOutgoingBehavior(ActivityExecution execution,
         boolean checkConditions, boolean throwExceptionIfExecutionStuck, List<ActivityExecution> reusableExecutions) {

   if (log.isDebugEnabled()) {
     log.debug("Leaving activity '{}'", execution.getActivity().getId());
   }

   String defaultSequenceFlow = (String) execution.getActivity().getProperty("default");//说明①
   List<PvmTransition> transitionsToTake = new ArrayList<PvmTransition>();

   List<PvmTransition> outgoingTransitions = execution.getActivity().getOutgoingTransitions();
   for (PvmTransition outgoingTransition : outgoingTransitions) {
     Expression skipExpression = outgoingTransition.getSkipExpression();
    //如果不是蹦跳的skipExpression
     if (!SkipExpressionUtil.isSkipExpressionEnabled(execution, skipExpression)) {
       // 没有默认线或者出去的线不是默认线
       if (defaultSequenceFlow == null || !outgoingTransition.getId().equals(defaultSequenceFlow)) {
         Condition condition = (Condition) outgoingTransition.getProperty(BpmnParse.PROPERTYNAME_CONDITION);
         //线上没有条件或者,限定了不检查线上条件,或者线上条件符合要求
         if (condition == null || !checkConditions || condition.evaluate(outgoingTransition.getId(), execution)) {
           transitionsToTake.add(outgoingTransition);
         }
       }

     } else if (SkipExpressionUtil.shouldSkipFlowElement(execution, skipExpression)){
       transitionsToTake.add(outgoingTransition);
     }
   }
   //说明②
   if (transitionsToTake.size() == 1) {
     //只有一条线的时候,直接的走take逻辑
     execution.take(transitionsToTake.get(0));
   } else if (transitionsToTake.size() >= 1) {
     //有多条线的时候
     execution.inactivate();
     if (reusableExecutions == null || reusableExecutions.isEmpty()) {
       execution.takeAll(transitionsToTake, Arrays.asList(execution));
     } else {
       execution.takeAll(transitionsToTake, reusableExecutions);
     }

   } else {
    //没有满足条件出去的线,如果有默认的线
     if (defaultSequenceFlow != null) {
       PvmTransition defaultTransition = execution.getActivity().findOutgoingTransition(defaultSequenceFlow);
       if (defaultTransition != null) {
         execution.take(defaultTransition);
       } else {
         throw new ActivitiException("Default sequence flow '" + defaultSequenceFlow + "' could not be not found");
       }
     } else {
       //如果有补偿事件
       Object isForCompensation = execution.getActivity().getProperty(BpmnParse.PROPERTYNAME_IS_FOR_COMPENSATION);
       if(isForCompensation != null && (Boolean) isForCompensation) {
         if (execution instanceof ExecutionEntity) {
           Context.getCommandContext().getHistoryManager().recordActivityEnd((ExecutionEntity) execution);
         }
         InterpretableExecution parentExecution = (InterpretableExecution) execution.getParent();
         ((InterpretableExecution)execution).remove();
         parentExecution.signal("compensationDone", null);

       } else {
         //暂停了,抛出错误
         if (log.isDebugEnabled()) {
           log.debug("No outgoing sequence flow found for {}. Ending execution.", execution.getActivity().getId());
         }
         execution.end();

         if (throwExceptionIfExecutionStuck) {
           throw new ActivitiException("No outgoing sequence flow of the inclusive gateway '" + execution.getActivity().getId()
                 + "' could be selected for continuing the process");
         }
       }

     }
   }
 }

流程定义节点之间的转移,是先找到满足条件的离开的线:

public void take(PvmTransition transition) {
  	take(transition, true);
  }

  public void take(PvmTransition transition, boolean fireActivityCompletionEvent) {

  	if (fireActivityCompletionEvent) {
	  	fireActivityCompletedEvent();
  	}

    if (this.transition!=null) {
      throw new PvmException("already taking a transition");
    }
    if (transition==null) {
      throw new PvmException("transition is null");
    }
    setActivity((ActivityImpl)transition.getSource());
    setTransition((TransitionImpl) transition);
    performOperation(AtomicOperation.TRANSITION_NOTIFY_LISTENER_END);
  }

说的转移,只是同一个ExecutionEntity里面的 Activity修改了,Transition修改了,然后开始执行:TRANSITION_NOTIFY_LISTENER_END. 但是和我们想象的不一样,这里设置的Activiti是Transition的Source,也就是“刚刚执行结束”的活动定义:

Execution设置activity

PROCESS_START = new AtomicOperationProcessStart
||
PROCESS_START_INITIAL = new AtomicOperationProcessStartInitial();
||
ACTIVITY_EXECUTE = new AtomicOperationActivityExecute();
||
TRANSITION_NOTIFY_LISTENER_END = new AtomicOperationTransitionNotifyListenerEnd();

执行的过程:

 public void execute(InterpretableExecution execution) {
    ScopeImpl scope = getScope(execution);
    List<ExecutionListener> exectionListeners = scope.getExecutionListeners(getEventName());
    int executionListenerIndex = execution.getExecutionListenerIndex();
    
    if (exectionListeners.size()>executionListenerIndex) {
      execution.setEventName(getEventName());
      execution.setEventSource(scope);
      ExecutionListener listener = exectionListeners.get(executionListenerIndex);
      try {
        listener.notify(execution);
      } catch (RuntimeException e) {
        throw e;
      } catch (Exception e) {
        throw new PvmException("couldn't execute event listener : "+e.getMessage(), e);
      }
      execution.setExecutionListenerIndex(executionListenerIndex+1);
      execution.performOperation(this);//说明①

    } else {
      execution.setExecutionListenerIndex(0);
      execution.setEventName(null);
      execution.setEventSource(null);
      eventNotificationsCompleted(execution);
    }
  }
  
  @Override
  protected void eventNotificationsCompleted(InterpretableExecution execution) {
    execution.performOperation(TRANSITION_DESTROY_SCOPE);
  }

说明①: 这个是一个循环调用的,为了循环的发送监听事件 现在运行到了:TRANSITION_DESTROY_SCOPE

||
TRANSITION_DESTROY_SCOPE = new AtomicOperationTransitionDestroyScope();
public class AtomicOperationTransitionDestroyScope implements AtomicOperation

说明:TRANSITION_DESTROY_SCOPE直接实现了接口,没有像TRANSITION_NOTIFY_LISTENER_END一样,在抽象类的基础方面实现。这可能也是为什么有了接口,还有抽象类的原因?

public void execute(InterpretableExecution execution) {
    InterpretableExecution propagatingExecution = null;

    ActivityImpl activity = (ActivityImpl) execution.getActivity();
    // if this transition is crossing a scope boundary
    if (activity.isScope()) {//当前的任务是否是一个包含的活动节点
      
      InterpretableExecution parentScopeInstance = null;
      // if this is a concurrent execution crossing a scope boundary
      if (execution.isConcurrent() && !execution.isScope()) {
        // first remove the execution from the current root
        InterpretableExecution concurrentRoot = (InterpretableExecution) execution.getParent();
        parentScopeInstance = (InterpretableExecution) execution.getParent().getParent();

        log.debug("moving concurrent {} one scope up under {}", execution, parentScopeInstance);
        List<InterpretableExecution> parentScopeInstanceExecutions = (List<InterpretableExecution>) parentScopeInstance.getExecutions();
        List<InterpretableExecution> concurrentRootExecutions = (List<InterpretableExecution>) concurrentRoot.getExecutions();
        // if the parent scope had only one single scope child
        if (parentScopeInstanceExecutions.size()==1) {
          // it now becomes a concurrent execution
          parentScopeInstanceExecutions.get(0).setConcurrent(true);
        }
        
        concurrentRootExecutions.remove(execution);
        parentScopeInstanceExecutions.add(execution);
        execution.setParent(parentScopeInstance);
        execution.setActivity(activity);
        propagatingExecution = execution;
        
        // if there is only a single concurrent execution left
        // in the concurrent root, auto-prune it.  meaning, the 
        // last concurrent child execution data should be cloned into
        // the concurrent root.   
        if (concurrentRootExecutions.size()==1) {
          InterpretableExecution lastConcurrent = concurrentRootExecutions.get(0);
          if (lastConcurrent.isScope()) {
            lastConcurrent.setConcurrent(false);
            
          } else {
            log.debug("merging last concurrent {} into concurrent root {}", lastConcurrent, concurrentRoot);
            
            // We can't just merge the data of the lastConcurrent into the concurrentRoot.
            // This is because the concurrent root might be in a takeAll-loop.  So the 
            // concurrent execution is the one that will be receiving the take
            concurrentRoot.setActivity((ActivityImpl) lastConcurrent.getActivity());
            concurrentRoot.setActive(lastConcurrent.isActive());
            lastConcurrent.setReplacedBy(concurrentRoot);
            lastConcurrent.remove();
          }
        }

      } else if (execution.isConcurrent() && execution.isScope()) {
        log.debug("scoped concurrent {} becomes concurrent and remains under {}", execution, execution.getParent());

        // TODO!
        execution.destroy();
        propagatingExecution = execution;
        
      } else {
        propagatingExecution = (InterpretableExecution) execution.getParent();
        propagatingExecution.setActivity((ActivityImpl) execution.getActivity());
        propagatingExecution.setTransition(execution.getTransition());
        propagatingExecution.setActive(true);
        log.debug("destroy scope: scoped {} continues as parent scope {}", execution, propagatingExecution);
        execution.destroy();
        execution.remove();
      }
      
    } else {
      propagatingExecution = execution;
    }
    
    // if there is another scope element that is ended
    // 父级别的元素,普通元素的父元素是流程定义ProcessDefinition
    ScopeImpl nextOuterScopeElement = activity.getParent();
    TransitionImpl transition = propagatingExecution.getTransition();
    // 目标节点
    ActivityImpl destination = transition.getDestination();
    //判断目标节点是否在父节点中,如果不在(考虑到子流程结束后的下一个元素是子流程以外的元素了)
    if (transitionLeavesNextOuterScope(nextOuterScopeElement, destination)) {
      propagatingExecution.setActivity((ActivityImpl) nextOuterScopeElement);
      //再次进入:TRANSITION_NOTIFY_LISTENER_END
      propagatingExecution.performOperation(TRANSITION_NOTIFY_LISTENER_END);
    } else {
    //进入:TRANSITION_NOTIFY_LISTENER_TAKE
      propagatingExecution.performOperation(TRANSITION_NOTIFY_LISTENER_TAKE);
    }
  }

从最后的逻辑,我们可以看到一个循环:

PROCESS_START = new AtomicOperationProcessStart
||
PROCESS_START_INITIAL = new AtomicOperationProcessStartInitial();
||
ACTIVITY_EXECUTE = new AtomicOperationActivityExecute();
||
TRANSITION_NOTIFY_LISTENER_END = new AtomicOperationTransitionNotifyListenerEnd();
||
TRANSITION_DESTROY_SCOPE = new AtomicOperationTransitionDestroyScope();
||
TRANSITION_NOTIFY_LISTENER_TAKE,TRANSITION_NOTIFY_LISTENER_END(第二种情况就是再次的循环一次)

TRANSITION_NOTIFY_LISTENER_TAKE 这个就是对应线的逻辑了:

TRANSITION_NOTIFY_LISTENER_TAKE = new AtomicOperationTransitionNotifyListenerTake();

执行逻辑:


  public void execute(InterpretableExecution execution) {
    TransitionImpl transition = execution.getTransition();
    
    List<ExecutionListener> executionListeners = transition.getExecutionListeners();
    int executionListenerIndex = execution.getExecutionListenerIndex();
    
    if (executionListeners.size()>executionListenerIndex) {
      execution.setEventName(org.activiti.engine.impl.pvm.PvmEvent.EVENTNAME_TAKE);
      execution.setEventSource(transition);
      ExecutionListener listener = executionListeners.get(executionListenerIndex);
      try {
        listener.notify(execution);
      } catch (RuntimeException e) {
        throw e;
      } catch (Exception e) {
        throw new PvmException("couldn't execute event listener : "+e.getMessage(), e);
      }
      execution.setExecutionListenerIndex(executionListenerIndex+1);
      execution.performOperation(this);
      //上面的这部分逻辑和抽象类里面的逻辑基本的一样
    } else {
    	if (log.isDebugEnabled()) {
    		log.debug("{} takes transition {}", execution, transition);
    	}
      execution.setExecutionListenerIndex(0);
      execution.setEventName(null);
      execution.setEventSource(null);

      ActivityImpl activity = (ActivityImpl) execution.getActivity();
      ActivityImpl nextScope = findNextScope(activity.getParent(), transition.getDestination());
      execution.setActivity(nextScope);//到这里ExecutionEntity的activity才再次的修改,但是Transition并没有修改
      
      // Firing event that transition is being taken     	。。。。。。。。      
      execution.performOperation(TRANSITION_CREATE_SCOPE);
    }
  }

然后就到了:TRANSITION_CREATE_SCOPE

TRANSITION_CREATE_SCOPE = new AtomicOperationTransitionCreateScope();

执行逻辑:

public void execute(InterpretableExecution execution) {
    InterpretableExecution propagatingExecution = null;
    ActivityImpl activity = (ActivityImpl) execution.getActivity();
    if (activity.isScope()) {//activity扩展自Scope
      // 重新创建了一个Execution,称之为: propagatingExecution
      // 该propagatingExecution的Transition仍然是:(startevent1)--flow1-->(usertask1)
      //并不是Null
      propagatingExecution = (InterpretableExecution) execution.createExecution();
      propagatingExecution.setActivity(activity);
      propagatingExecution.setTransition(execution.getTransition());
      
      execution.setTransition(null);
      execution.setActivity(null);
      execution.setActive(false);
      // Execution的activity,Transition全部都是null
      log.debug("create scope: parent {} continues as execution {}", execution, propagatingExecution);
      propagatingExecution.initialize();

    } else {
      propagatingExecution = execution;
    }
   propagatingExecution.performOperation(AtomicOperation.TRANSITION_NOTIFY_LISTENER_START);
  }

到了:AtomicOperation.TRANSITION_NOTIFY_LISTENER_START
public class AtomicOperationTransitionNotifyListenerStart extends AbstractEventAtomicOperation
扩展自抽象类:AbstractEventAtomicOperation 具体的执行的过程是:

  @Override
  protected void eventNotificationsCompleted(InterpretableExecution execution) {
    TransitionImpl transition = execution.getTransition();
    ActivityImpl destination = null;
    if(transition == null) { // this is null after async cont. -> transition is not stored in execution
      destination = (ActivityImpl) execution.getActivity();
    } else {
      destination = transition.getDestination();
    }    
    ActivityImpl activity = (ActivityImpl) execution.getActivity();
    if (activity!=destination) {
      ActivityImpl nextScope = AtomicOperationTransitionNotifyListenerTake.findNextScope(activity, destination);
      execution.setActivity(nextScope);
      //进入了scope,创建scope
      execution.performOperation(TRANSITION_CREATE_SCOPE);
    } else {
      //在这个地方设置了Transition为null
      execution.setTransition(null);
      //终于设置了目标activity
      execution.setActivity(destination);
      execution.performOperation(ACTIVITY_EXECUTE);
    }
  }

所以总体的循环就是:

PROCESS_START = new AtomicOperationProcessStart
||
PROCESS_START_INITIAL = new AtomicOperationProcessStartInitial();
||
ACTIVITY_EXECUTE(执行) = new AtomicOperationActivityExecute();
||
TRANSITION_NOTIFY_LISTENER_END = new AtomicOperationTransitionNotifyListenerEnd();
||
TRANSITION_DESTROY_SCOPE = new AtomicOperationTransitionDestroyScope();
||
TRANSITION_NOTIFY_LISTENER_TAKE,TRANSITION_NOTIFY_LISTENER_END(第二种情况就是再次的循环一次)
||
TRANSITION_CREATE_SCOPE(创建) = new AtomicOperationTransitionCreateScope();
||
TRANSITION_NOTIFY_LISTENER_START = new AtomicOperationTransitionNotifyListenerStart();
||
TRANSITION_CREATE_SCOPE(再次创建),ACTIVITY_EXECUTE(再次进入)

这个就是节点转移的全过程:

转移的过程主要的逻辑点: 1.拿到目标节点(dedestination),Scope就是包含活动节点之类的元素(子流程)开始和结束的时候的处理。
2.转移过程中监听事件的发送。
3.节点的执行,根据各个节点的Behavior运行。 选择那条线,以及流程定义中不同节点的判断全部的放在了Behavior中了

Powered by andiHappy and Theme by AndiHappy